﻿/**

 * Copyright (c) 2015-2016, FastDev 刘强 (fastdev@163.com) & Quincy.

 *

 * Licensed under the Apache License, Version 2.0 (the "License");

 * you may not use this file except in compliance with the License.

 * You may obtain a copy of the License at

 *

 *      http://www.apache.org/licenses/LICENSE-2.0

 *

 * Unless required by applicable law or agreed to in writing, software

 * distributed under the License is distributed on an "AS IS" BASIS,

 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

 * See the License for the specific language governing permissions and

 * limitations under the License.

 */

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using OF.Notify.DataHost.Cluster.Entity;
using OF.Notify.DataHost;
using OF.Notify.DataHost.Cluster;
using OF.Notify.Master.Zookeeper;
using OF.DistributeService.Core.Common;
using OF.Notify.Entity;

namespace OF.Notify.Master
{
    public class ClusterMaster
    {
        internal static readonly List<short> EmptyList = new List<short>();

        public bool isDisposed = false;

        internal Thread refreshClusterThread = null;
        internal AutoResetEvent refreshClusterEvent = new AutoResetEvent(false);

        
        public ConcurrentDictionary<short, DataNodeProxy> dataNodeProxyDict = null;
        internal ConcurrentQueue<List<DataNodeProxy>> newOnlineNodesQueue = new ConcurrentQueue<List<DataNodeProxy>>();

        internal TopicInstanceProvider<TopicClusterMaster> topicClusterMasterProvider = null;

        public bool IsNodeOffLine(short nodeId)
        {
            return !dataNodeProxyDict.ContainsKey(nodeId);
        }

        internal List<short> GetNoClusterNodes()
        {
            List<short> noClusterNodes = new List<short>();
            foreach(short nodeId in dataNodeProxyDict.Keys.ToList())
            {
                bool isInCluster = false;
                foreach (byte topicEnum in ClusterContext.Get().GetAllSupportedTopic())
                {
                    if (topicClusterMasterProvider.GetInstance(topicEnum).IsNodeInActiveCluster(nodeId))
                    {
                        isInCluster = true;
                        break;
                    }
                }
                if (!isInCluster)
                {
                    noClusterNodes.Add(nodeId);
                }
            }
            return noClusterNodes;
        }

        public List<short> GetOnLineNodeIds()
        {
            if (dataNodeProxyDict == null)
            {
                return EmptyList;
            }
            else
            {
                return dataNodeProxyDict.Keys.ToList();
            }
        }

        public void DoDispose()
        {
            isDisposed = true;
        }

        public static bool IsValidateId(int id)
        {
            if (id >= 0)
            {
                return true;
            }
            return false;
        }

        internal void InitAllTopicClusterMaster(ZooKeeperProxy zk)
        {
            var supportedTopicList = ClusterContext.Get().GetAllSupportedTopic();
            Dictionary<byte, TopicClusterMaster> topicClusterMasterDict = new Dictionary<byte, TopicClusterMaster>(supportedTopicList.Count);
            foreach (var topic in supportedTopicList)
            {
                topicClusterMasterDict.Add(topic, new TopicClusterMaster(this, zk, topic));
            }
            topicClusterMasterProvider = new TopicInstanceProvider<TopicClusterMaster>(topicClusterMasterDict);
        }

        internal void DoDisposeNodeOnInstantProcessCollections(short nodeId)
        {
            var supportedTopicList = ClusterContext.Get().GetAllSupportedTopic();
            foreach (var topic in supportedTopicList)
            {
                topicClusterMasterProvider.GetInstance(topic).DoDisposeNodeOnInstantProcessCollections(nodeId);
            }
        }

        public ClusterMaster(ZooKeeperProxy zk, List<DataNodeProxy> newOnlineDataNodeList)
        {
            var context = ClusterContext.Get();
            InitAllTopicClusterMaster(zk);
            
            refreshClusterThread = new Thread(new ThreadStart(() =>
            {
                this.Restore(newOnlineDataNodeList);
            }));
            refreshClusterThread.Start();
        }

        public AllNodesRuntimeData GetAllRuntimeData(byte topicEnum, List<byte> consumerEnumList)
        {
            return topicClusterMasterProvider.GetInstance(topicEnum).GetAllRuntimeData(consumerEnumList);
        }

        public void MockDisposeAllCluster()
        {
            foreach (var topicEnum in ClusterContext.Get().GetAllSupportedTopic())
            {
                topicClusterMasterProvider.GetInstance(topicEnum).MockDisposeAllCluster();
            }
        }
        
        internal void Restore(List<DataNodeProxy> newDataNodeList)
        {
            try
            {
                if (isDisposed)
                {
                    return;
                }
                dataNodeProxyDict = new ConcurrentDictionary<short, DataNodeProxy>(newDataNodeList.Select(node => new KeyValuePair<short, DataNodeProxy>(node.GetId(), node)));
                List<short> nodeIdList = newDataNodeList.Select(node => node.GetId()).ToList();
                this.AddOnlineNode(nodeIdList);
                while (!isDisposed)
                {
                    try
                    {
                        refreshClusterEvent.WaitOne();
                        List<DataNodeProxy> dataNodeList = null;
                        List<DataNodeProxy> lastDataNodeList = null;
                        while (newOnlineNodesQueue.TryDequeue(out dataNodeList))
                        {
                            if (isDisposed)
                            {
                                return;
                            }
                            lastDataNodeList = dataNodeList;
                        }

                        if (isDisposed)
                        {
                            return;
                        }

                        if (lastDataNodeList != null)
                        {
                            UpdateClusterNodes(lastDataNodeList);
                        }
                    }
                    catch (Exception ex2)
                    {
                        Util.LogException("Restore Core", ex2);
                        if (isDisposed)
                        {
                            return;
                        }
                        Thread.Sleep(20);
                    }
                }
            }
            catch (Exception ex)
            {
                Util.LogException("Restore", ex);
                if (isDisposed)
                {
                    return;
                }
                throw;
            }
        }



        internal void UpdateClusterNodes(List<DataNodeProxy> newDataNodeList)
        {
            if (isDisposed)
            {
                return;
            }
            List<DataNodeProxy> toAddList = newDataNodeList.Where(newDataNode => !this.dataNodeProxyDict.ContainsKey(newDataNode.GetId())).ToList();
            List<short> toDeleteList = dataNodeProxyDict.Values.Where(oldNode => newDataNodeList.FirstOrDefault(newNode => newNode.GetId() == oldNode.GetId()) == null).Select(node => node.GetId()).ToList();
            UpdateClusterNodesCore(toAddList, toDeleteList);
        }

        public DataNodeProxy GetProxy(short nodeId)
        {
            DataNodeProxy proxy = null;
            dataNodeProxyDict.TryGetValue(nodeId, out proxy);
            return proxy;
        }

        internal void UpdateClusterNodesCore(List<DataNodeProxy> nodeAddList, List<short> nodeRemoveList)
        {
            if (isDisposed)
            {
                return;
            }

            foreach (var dataNode in nodeAddList)
            {
                dataNodeProxyDict.TryAdd(dataNode.GetId(), dataNode);
            }
            AddOnlineNode(nodeAddList.Select(node => node.GetId()).ToList());
            foreach (var id in nodeRemoveList)
            {
				DataNodeProxy deleteNode = null;
                dataNodeProxyDict.TryRemove(id, out deleteNode);
            }
			
			foreach (var id in nodeRemoveList)
			{
				RemoveOnlineNode(id);
			}
        }
		
        public void NotifyClusterChanged(List<DataNodeProxy> newDataNodeList)
        {
            newOnlineNodesQueue.Enqueue(newDataNodeList);
            this.refreshClusterEvent.Set();
        }

        internal void BuildCluster()
        {
            var context = ClusterContext.Get();
            List<List<short>> nodeListList = new List<List<short>>();
            lock (this)
            {
                List<short> noClusterNodes = GetNoClusterNodes();
                while (noClusterNodes.Count >= context.RongYu)
                {
                    List<short> nodeList = new List<short>(context.RongYu);
                    for (int i1 = 0; i1 < context.RongYu; i1++)
                    {
                        short node = -1;
                        node = noClusterNodes[0];
                        noClusterNodes.RemoveAt(0);
                        nodeList.Add(node);
                    }
                    nodeList.Sort((a, b) =>
                    {
                        if (a > b)
                        {
                            return 1;
                        }
                        else if (a == b)
                        {
                            return 0;
                        }
                        else
                        {
                            return -1;
                        }
                    });
                    nodeListList.Add(nodeList);
                }
            }
            foreach (var nodeList in nodeListList)
            {
                foreach (var topicEnum in ClusterContext.Get().GetAllSupportedTopic())
                {
                    if (isDisposed)
                    {
                        return;
                    }
					//Util.LogInfo(topicEnum + " AssignTopicCluster " + string.Join(" ", nodeList));
                    topicClusterMasterProvider.GetInstance(topicEnum).AssignTopicCluster(nodeList);
                }
            }
        }

        public void AddOnlineNode(List<short> nodeIdList)
        {
            //Util.LogInfo("AddOnlineNode:" + string.Join(" ", nodeIdList));
            Parallel.ForEach(nodeIdList, ClusterContext.GetParallelOptions(), (nodeId) => {
                foreach (var topicEnum in ClusterContext.Get().GetAllSupportedTopic())
                {
                    if (isDisposed)
                    {
                        return;
                    }
                    topicClusterMasterProvider.GetInstance(topicEnum).MergeProcessTree(nodeId);
                }
            });
            if (isDisposed)
            {
                return;
            }
            BuildCluster();
            foreach (var topicEnum in ClusterContext.Get().GetAllSupportedTopic())
            {
                if (isDisposed)
                {
                    return;
                }
                topicClusterMasterProvider.GetInstance(topicEnum).MultiSendSyncProcessedTree();
            }
        }

        public void RemoveOnlineNode(short id)
        {
            this.DoDisposeNodeOnInstantProcessCollections(id);
            foreach (var topicEnum in ClusterContext.Get().GetAllSupportedTopic())
            {
                if (isDisposed)
                {
                    return;
                }
                topicClusterMasterProvider.GetInstance(topicEnum).RemoveOnlineNode(id);
            }
            BuildCluster();
            if (isDisposed)
            {
                return;
            }

            foreach (var topicEnum in ClusterContext.Get().GetAllSupportedTopic())
            {
                if (isDisposed)
                {
                    return;
                }
                topicClusterMasterProvider.GetInstance(topicEnum).MultiSendSyncProcessedTree();
            }
        }
		
		public CallServiceResult<bool> DeliverMessage(DeliverMessageRequest request)
		{
            return topicClusterMasterProvider.GetInstance(request.topicEnum).DeliverMessage(request);
		}
		
		public void OnInstantCollectionFinished(byte topicEnum, byte consumerEnum, bool isSuccessed, int clusterId, int collectionId)
		{
            topicClusterMasterProvider.GetInstance(topicEnum).OnInstantCollectionFinished(consumerEnum, isSuccessed, clusterId, collectionId);
		}
    } 
}
